-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Stop copying LogicalPlan and Exprs in TypeCoercion
(10% faster planning)
#10356
Conversation
@@ -467,6 +468,200 @@ impl LogicalPlan { | |||
self.with_new_exprs(self.expressions(), inputs.to_vec()) | |||
} | |||
|
|||
/// Recomputes schema and type information for this LogicalPlan if needed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe this is a new API for using TreeNode
to rewrite plans in ways that change the schema.
This effectively factors out the recalculation part of LogicalPlan::new_with_exprs
I tried to find a way to use reuse this logic in LogicalPlan::new_with_exprs
but was not able to without forcing (another) clone
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FYI @peter-toth I suspect you may need something like this for common subexpression elimination / #9873
TypeCoercion
TypeCoercion
(10% faster planning)
.map_data(|expr| original_name.restore(expr)) | ||
})? | ||
// coerce join expressions specially | ||
.map_data(|plan| expr_rewrite.coerce_joins(plan))? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since expr_rewrite.coerce_joins(plan)
can change the plan, shouldn't its result be Result<Transformed<LogicalPlan>>
? And then here we should probably use map_transformed()
instead of the current map_data()
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for anyone following along, the response is https://github.com/apache/datafusion/pull/10356/files#r1588998665 (tldr should do as a follow on PR)
// coerce join expressions specially | ||
.map_data(|plan| expr_rewrite.coerce_joins(plan))? | ||
// recompute the schema after the expressions have been rewritten as the types may have changed | ||
.map_data(|plan| plan.recompute_schema()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we always need to run plan.recompute_schema()
? If the Transformed<LogicalPlan>
's .transformed
is false then probably we don't need to.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is an excellent point. At the moment, I think we do need to always run recompute_schema because the TypeCoercionRewriter
doesn't return Transformed
(and thus we don't know if any actual expression coercion was done, so we have to assume it was).
I filed #10365 to track improving this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, I think you use TypeCoercionRewriter
in expr.rewrite(&mut expr_rewrite)?
and that rewrite()
returns Transformed<Expr>
and then that Transformed<Expr>
is propagated up into plan.map_expressions()
, that returns Transformed<LogicalPlan>
. So you have the necessary Transformed
to decide if recompute_schema()
is needed. Or not? 🙂
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are correct (of course!) thank you for pointing it out. Now that analyze_internal
returns Transformed would work. However, there is still code like this:
let new_plan =
analyze_internal(self.schema, unwrap_arc(subquery.subquery))?.data;
Ok(Transformed::yes(Expr::Exists(Exists {
subquery: Subquery {
subquery: Arc::new(new_plan),
outer_ref_columns: subquery.outer_ref_columns,
},
negated,
})))
}
Which discards the transformed information (and in this case always returns Transformed::true).
In order to keep the PRs small and easier to review I would like to not change this PR (it is no worse than main
in regards to recomputing schema) and I will make a follow on PR to avoid recomputing schema when unecessary
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah ok, it seems there are many unnecessary Transformed::yes
s in the current code. But false positive transformed
s doesn't cause any issue...
Sure, a follow-up PR sounds good, I agree that this PR already looks really nice!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here is my draft followup: #10369
It is quite large (it requires updating the entire expression rewriter) so I am glad we left it in a separate PR
// get schema representing all available input fields. This is used for data type | ||
// resolution only, so order does not matter here | ||
let mut schema = merge_schema(new_inputs.iter().collect()); | ||
let mut schema = merge_schema(plan.inputs()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 q
.map(|(lhs, rhs)| { | ||
// coerce the arguments as though they were a single binary equality | ||
// expression | ||
let (lhs, rhs) = self.coerce_binary_op(lhs, Operator::Eq, rhs)?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure if this method needed, as it looks like we just cast lhs, rhs? it feels it can be simplified?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think coerce_binary_op
is different than just casting lhs
and rhs
as it first calls get_input_types
:
let (left_type, right_type) = get_input_types(
&left.get_type(self.schema)?,
&op,
&right.get_type(self.schema)?,
)?;
And get_input_types
usese the comparison coercion rules to figure out a common set if types to coerce lhs
and rhs
to.
Co-authored-by: Oleks V <[email protected]>
…o alamb/type_coercion
@comphead I think this PR is ready to go. Would you be willing to approve it? Or are there other comments you would like to see addressed? |
/// For example, on_exprs like `t1.a = t2.b AND t1.x = t2.y` will be stored | ||
/// as a list of `(t1.a, t2.b), (t1.x, t2.y)` | ||
fn coerce_joins(&mut self, plan: LogicalPlan) -> Result<LogicalPlan> { | ||
let LogicalPlan::Join(mut join) = plan else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thats an interesting syntax
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it checks the plan can be deconstructed into LogicalPlan::Join(...) and if its not the else branch is triggered?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that is exactly right. It is one of my favorite Rust syntax's as it often can avoid a level of indenting
https://doc.rust-lang.org/rust-by-example/flow_control/let_else.html
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm thanks @alamb!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @comphead 🙏
/// For example, on_exprs like `t1.a = t2.b AND t1.x = t2.y` will be stored | ||
/// as a list of `(t1.a, t2.b), (t1.x, t2.y)` | ||
fn coerce_joins(&mut self, plan: LogicalPlan) -> Result<LogicalPlan> { | ||
let LogicalPlan::Join(mut join) = plan else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that is exactly right. It is one of my favorite Rust syntax's as it often can avoid a level of indenting
https://doc.rust-lang.org/rust-by-example/flow_control/let_else.html
…ning) (apache#10356) * Add `LogicalPlan::recompute_schema` for handling rewrite passes * Stop copying LogicalPlan and Exprs in `TypeCoercion` * Apply suggestions from code review Co-authored-by: Oleks V <[email protected]> --------- Co-authored-by: Oleks V <[email protected]>
Note it has code from #10410 so that might good to review firstWhich issue does this PR close?
Closes #10210
Part of #9637 -- let's make DataFusion planning faster by not copying so much
Rationale for this change
Now that we have the nice TreeNode API thanks to #8913 and @peter-toth let's use it to both simplify the code and avoid copies
What changes are included in this PR?
TypeCoercion
via TreeNode APILogicalPlan::recompute_schema
to recompute the schema after expressions in a plan are changedAre these changes tested?
Existing CI
Are there any user-facing changes?
Faster planning:
Details